
source('scripts/cleaning_utils.r')

clean_path = function(path, schema){
  open_dataset(path, format = 'text', delim = '|', quote = '', schema = schema, skip = 1, check_utf8 = F) %>%
    transmute(
      
      fips = as.character(`FIPS CODE`) %>% str_pad(5, 'left', '0'),
      parcel_id = `COMPOSITE PROPERTY LINKAGE KEY`,
      
      census_id = `CENSUS ID`,
      
      site_lat = `PARCEL LEVEL LATITUDE`,
      site_long = `PARCEL LEVEL LONGITUDE`,
      
      site_address = `SITUS STREET ADDRESS`,
      site_city = `SITUS CITY`,
      site_zip = `SITUS ZIP CODE`,
      site_state = `SITUS STATE`,
      
      mail_address = `MAILING STREET ADDRESS`,
      mail_city = `MAILING CITY`,
      mail_zip = `MAILING ZIP CODE`,
      mail_state = `MAILING STATE`,
      
      buyer1_full = `OWNER 1 FULL NAME`,
      buyer1_last = `OWNER 1 LAST NAME`,
      buyer1_fm = `OWNER 1 FIRST NAME & MIDDLE INITIAL`,
      buyer1_corp = `OWNER 1 CORPORATE INDICATOR`,
      
      buyer2_full = `OWNER 2 FULL NAME`,
      buyer2_last = `OWNER 2 LAST NAME`,
      buyer2_fm = `OWNER 2 FIRST NAME & MIDDLE INITIAL`,
      buyer2_corp = `OWNER 2 CORPORATE INDICATOR`,
      
      calc_total_value = `TOTAL VALUE CALCULATED`,
      calc_value_source = `CALCULATED VALUE SOURCE CODE`,
      
      assd_total_value = `ASSESSED TOTAL VALUE`,
      mark_total_value = `MARKET TOTAL VALUE`,
      appr_total_value = `APPRAISED TOTAL VALUE`,
      
      tax_amt = `TAX AMOUNT`,
      tax_year = `TAX YEAR`,
      assd_year = `ASSESSED YEAR`,
      
      begin_year = `BEGINNING TAX YEAR DATE`,
      end_year = `ENDING TAX YEAR DATE`,
      
      property_type = `PROPERTY INDICATOR CODE`,
      occupancy = `OWNER OCCUPANCY CODE`
    ) %>%
    mutate(fips_st = str_sub(fips, 1, 2)) %>%
    group_by(fips_st)
}

# Pull county-level data --------------------------------------------------

counties = read_delim(
  'national_county.txt',
  col_names = c('state', 'fips_st'),
  col_select = 1:2
  ) %>%
  distinct()

# Pull corelogic raw ------------------------------------------------------

files = read_csv('cl/cl_inventory.csv')

for(i in 1:length(files)){
  files$path[i] %>%
    fread(nrows = 1) %>%
    mutate(across(!where(is.character) , as.character)) %>%
    as.data.table() %>%
    arrow_table() %>%
    {
      clean_path(files$path[i], .$schema)
    } %>%
    write_dataset(glue('data/cl/vintage={files$year[i]}'), max_rows_per_file = 5e5)
}

# Clean data --------------------------------------------------------------

iter = open_datasets('data/cl') %>%
  distinct(vintage, fips_st) %>%
  collect() %>%
  left_join(counties) # by fips_st; adds state col

for(i in 1:nrow(iter)){

  this_fips = iter$fips_st[i]; this_state = iter$state[i]; this_year = iter$year[i]
  print(glue('Working in {this_fips} {this_state} {this_year}...'))

  x = open_dataset(glue('data/cl/vintage={this_year}/fips_st={this_fips}')) %>%
    as.data.table()

  x[, site_state := this_state]
  x[, row_n := .I]

  core = sort(sample(1:200, nrow(x), replace = T))

  x[,cl_row_id := split(x$row_n, core) %>% mclapply(function(x) str_c(glue('cl-{str_sub(this_year, 3, 4)}-{this_fips}-'), x), mc.cores = 10) %>% unlist()]

  cols = c('buyer1_full', 'buyer1_last', 'buyer2_full', 'buyer2_last')
  x[, (cols) := mclapply(.SD, fix_last_names, mc.cores = length(cols)), .SDcols = (cols)]

  x[,c('buyer1_last', 'buyer1_fm', 'buyer2_last', 'buyer2_fm') := split(x[,buyer1_full:buyer2_corp], core) %>% mclapply(parse_trust_20, mc.cores = 10) %>% rbindlist() %>% as.list()]

  x[,site_po := split(x$site_address, core) %>% mclapply(po_box_parser, mc.cores = 10) %>% unlist()]
  x[,mail_po := split(x$mail_address, core) %>% mclapply(po_box_parser, mc.cores = 10) %>% unlist()]

  x[,site_zip := split(x$site_zip, core) %>% mclapply(normalize_zip, mc.cores = 10) %>% unlist()]
  x[,mail_zip := split(x$mail_zip, core) %>% mclapply(normalize_zip, mc.cores = 10) %>% unlist()]

  x[,site_address := case_when(is.na(site_po) ~ site_address, T ~ NA_character_) %>% as.character()]
  x[,mail_address := case_when(is.na(mail_po) ~ mail_address, T ~ NA_character_) %>% as.character()]

  cols = c('site_address', 'site_city', 'site_state', 'mail_address', 'mail_city', 'mail_state')
  x[, (cols) := mclapply(.SD, function(x) na_if(str_squish(str_to_lower(x)), ''), mc.cores = length(cols)), .SDcols = (cols)]

  cols = c('buyer1_last', 'buyer1_fm', 'buyer2_last', 'buyer2_fm')
  x[, (cols) := mclapply(.SD, function(x) na_if(str_squish(str_to_lower(x)), ''), mc.cores = length(cols)), .SDcols = (cols)]

  cols = c('buyer1_corp', 'buyer2_corp', 'property_type', 'occupancy', 'calc_value_source', 'census_id')
  x[, (cols) := mclapply(.SD, function(x) na_if(x, ''), mc.cores = length(cols)), .SDcols = (cols)]

  x[,site_address := split(x$site_address, core) %>% mclapply(usps_address, mc.cores = 10) %>% unlist()]
  x[,mail_address := split(x$mail_address, core) %>% mclapply(usps_address, mc.cores = 10) %>% unlist()]

  x[,mail_address := case_when(mail_address == site_address ~ NA_character_, T ~ mail_address)]

  out = rbindlist(list(
    x[(is.na(buyer1_corp) | ((!is.na(buyer1_fm)) & buyer1_corp == "Y")) &
        (!(is.na(buyer1_last) & is.na(buyer1_fm))) &
        (site_state %chin% tolower(c('DC', state.abb))) &
        (!(is.na(site_address) & is.na(site_po)))
    ][
      , .(cl_row_id, last = buyer1_last, first_m = buyer1_fm, address = site_address,
          po_num = site_po, city = site_city, zip = site_zip, state = site_state,
          address_type = 's', person = '1',
          fips, parcel_id, census_id, site_lat, site_long,
          calc_total_value, calc_value_source,
          assd_total_value, mark_total_value, appr_total_value,
          tax_amt, tax_year, assd_year, begin_year, end_year,
          property_type, occupancy)
    ],
    x[(is.na(buyer2_corp) | ((!is.na(buyer2_fm)) & buyer2_corp == "Y")) &
        (!(is.na(buyer2_last) & is.na(buyer2_fm))) &
        (site_state %chin% tolower(c('DC', state.abb))) &
        (!(is.na(site_address) & is.na(site_po)))
    ][
      , .(cl_row_id, last = buyer2_last, first_m = buyer2_fm, address = site_address,
          po_num = site_po, city = site_city, zip = site_zip, state = site_state,
          address_type = 's', person = '2',
          fips, parcel_id, census_id, site_lat, site_long,
          calc_total_value, calc_value_source,
          assd_total_value, mark_total_value, appr_total_value,
          tax_amt, tax_year, assd_year, begin_year, end_year,
          property_type, occupancy)
    ],
    x[(is.na(buyer1_corp) | ((!is.na(buyer1_fm)) & buyer1_corp == "Y")) &
        (!(is.na(buyer1_last) & is.na(buyer1_fm))) &
        (mail_state %chin% tolower(c('DC', state.abb))) &
        (!(is.na(mail_address) & is.na(mail_po)))
    ][
      , .(cl_row_id, last = buyer1_last, first_m = buyer1_fm, address = mail_address,
          po_num = mail_po, city = mail_city, zip = mail_zip, state = mail_state,
          address_type = 'm', person = '1',
          fips, parcel_id, census_id, site_lat, site_long,
          calc_total_value, calc_value_source,
          assd_total_value, mark_total_value, appr_total_value,
          tax_amt, tax_year, assd_year, begin_year, end_year,
          property_type, occupancy)
    ],
    x[(is.na(buyer2_corp) | ((!is.na(buyer2_fm)) & buyer2_corp == "Y")) &
        (!(is.na(buyer2_last) & is.na(buyer2_fm))) &
        (mail_state %chin% tolower(c('DC', state.abb))) &
        (!(is.na(mail_address) & is.na(mail_po)))
    ][
      , .(cl_row_id, last = buyer2_last, first_m = buyer2_fm, address = mail_address,
          po_num = mail_po, city = mail_city, zip = mail_zip, state = mail_state,
          address_type = 'm', person = '2',
          fips, parcel_id, census_id, site_lat, site_long,
          calc_total_value, calc_value_source,
          assd_total_value, mark_total_value, appr_total_value,
          tax_amt, tax_year, assd_year, begin_year, end_year,
          property_type, occupancy)
    ]
  ))

  out[, uid := str_c(cl_row_id, '-', person, '-', address_type)]
  out[, year := this_year]

  core = sort(sample(1:200, nrow(out), replace = T))

  out[, first_only := split(out$first_m, core) %>% mclapply(function(x) str_remove(x, '\\s.*'), mc.cores = 10) %>% unlist()]
  out[, last_only := split(out$last, core) %>% mclapply(function(x) str_remove(x, '\\s.*'), mc.cores = 10) %>% unlist()]

  out = out[!(last == 'owner' & first_only == 'record'),]

  cols = c('tax_year', 'assd_year', 'begin_year', 'end_year')
  out[, (cols) := mclapply(.SD, function(x) as.integer(str_sub(x, 1, 4)), mc.cores = length(cols)), .SDcols = cols]

  cols = c('tax_amt', 'calc_total_value', 'assd_total_value', 'mark_total_value', 'appr_total_value', 'site_lat', 'site_long')
  out[, (cols) := mclapply(.SD, as.double, mc.cores = length(cols)), .SDcols = cols]

  arrow_table(out) %>%
    left_join(names,
              by = c('first_only' = 'first_name'),
              copy = T) %>%
    mutate(gender = case_when(gender == 'F' ~ 2L, gender == 'M' ~ 0L, T ~ 1L)) %>%
    write_dataset(
      glue('data/cl_clean/vintage={this_year}/fips_st={this_fips}'),
      max_rows_per_file = 5e5,
      partitioning = 'state'
    )
  
}

# repartition -- optional for the workflow, but if you have space:
for(s in tolower(sort(c('dc', state.abb)))){
  print(glue('Working in {s}...'))
  
  open_dataset('data/cl_clean') %>%
    filter(state == s) %>%
    write_dataset(glue('data/cl_in/{s}'), max_rows_per_file = 5e5, partitioning = 'vintage')

}

